home *** CD-ROM | disk | FTP | other *** search
/ Freelog 100 / FreelogNo100-NovembreDecembre2010.iso / Musique / solfege / solfege-win32-3.17.0.exe / {app} / bin / Lib / bsddb / test / test_dbshelve.py < prev    next >
Text File  |  2007-12-10  |  10KB  |  359 lines

  1. """
  2. TestCases for checking dbShelve objects.
  3. """
  4.  
  5. import sys, os, string
  6. import tempfile, random
  7. from pprint import pprint
  8. from types import *
  9. import unittest
  10.  
  11. try:
  12.     # For Pythons w/distutils pybsddb
  13.     from bsddb3 import db, dbshelve
  14. except ImportError:
  15.     # For Python 2.3
  16.     from bsddb import db, dbshelve
  17.  
  18. from test_all import verbose
  19.  
  20.  
  21. #----------------------------------------------------------------------
  22.  
  23. # We want the objects to be comparable so we can test dbshelve.values
  24. # later on.
  25. class DataClass:
  26.     def __init__(self):
  27.         self.value = random.random()
  28.  
  29.     def __cmp__(self, other):
  30.         return cmp(self.value, other)
  31.  
  32. class DBShelveTestCase(unittest.TestCase):
  33.     def setUp(self):
  34.         self.filename = tempfile.mktemp()
  35.         self.do_open()
  36.  
  37.     def tearDown(self):
  38.         self.do_close()
  39.         try:
  40.             os.remove(self.filename)
  41.         except os.error:
  42.             pass
  43.  
  44.     def mk(self, key):
  45.         """Turn key into an appropriate key type for this db"""
  46.         # override in child class for RECNO
  47.         return key
  48.  
  49.     def populateDB(self, d):
  50.         for x in string.letters:
  51.             d[self.mk('S' + x)] = 10 * x           # add a string
  52.             d[self.mk('I' + x)] = ord(x)           # add an integer
  53.             d[self.mk('L' + x)] = [x] * 10         # add a list
  54.  
  55.             inst = DataClass()            # add an instance
  56.             inst.S = 10 * x
  57.             inst.I = ord(x)
  58.             inst.L = [x] * 10
  59.             d[self.mk('O' + x)] = inst
  60.  
  61.  
  62.     # overridable in derived classes to affect how the shelf is created/opened
  63.     def do_open(self):
  64.         self.d = dbshelve.open(self.filename)
  65.  
  66.     # and closed...
  67.     def do_close(self):
  68.         self.d.close()
  69.  
  70.  
  71.  
  72.     def test01_basics(self):
  73.         if verbose:
  74.             print '\n', '-=' * 30
  75.             print "Running %s.test01_basics..." % self.__class__.__name__
  76.  
  77.         self.populateDB(self.d)
  78.         self.d.sync()
  79.         self.do_close()
  80.         self.do_open()
  81.         d = self.d
  82.  
  83.         l = len(d)
  84.         k = d.keys()
  85.         s = d.stat()
  86.         f = d.fd()
  87.  
  88.         if verbose:
  89.             print "length:", l
  90.             print "keys:", k
  91.             print "stats:", s
  92.  
  93.         assert 0 == d.has_key(self.mk('bad key'))
  94.         assert 1 == d.has_key(self.mk('IA'))
  95.         assert 1 == d.has_key(self.mk('OA'))
  96.  
  97.         d.delete(self.mk('IA'))
  98.         del d[self.mk('OA')]
  99.         assert 0 == d.has_key(self.mk('IA'))
  100.         assert 0 == d.has_key(self.mk('OA'))
  101.         assert len(d) == l-2
  102.  
  103.         values = []
  104.         for key in d.keys():
  105.             value = d[key]
  106.             values.append(value)
  107.             if verbose:
  108.                 print "%s: %s" % (key, value)
  109.             self.checkrec(key, value)
  110.  
  111.         dbvalues = d.values()
  112.         assert len(dbvalues) == len(d.keys())
  113.         values.sort()
  114.         dbvalues.sort()
  115.         assert values == dbvalues
  116.  
  117.         items = d.items()
  118.         assert len(items) == len(values)
  119.  
  120.         for key, value in items:
  121.             self.checkrec(key, value)
  122.  
  123.         assert d.get(self.mk('bad key')) == None
  124.         assert d.get(self.mk('bad key'), None) == None
  125.         assert d.get(self.mk('bad key'), 'a string') == 'a string'
  126.         assert d.get(self.mk('bad key'), [1, 2, 3]) == [1, 2, 3]
  127.  
  128.         d.set_get_returns_none(0)
  129.         self.assertRaises(db.DBNotFoundError, d.get, self.mk('bad key'))
  130.         d.set_get_returns_none(1)
  131.  
  132.         d.put(self.mk('new key'), 'new data')
  133.         assert d.get(self.mk('new key')) == 'new data'
  134.         assert d[self.mk('new key')] == 'new data'
  135.  
  136.  
  137.  
  138.     def test02_cursors(self):
  139.         if verbose:
  140.             print '\n', '-=' * 30
  141.             print "Running %s.test02_cursors..." % self.__class__.__name__
  142.  
  143.         self.populateDB(self.d)
  144.         d = self.d
  145.  
  146.         count = 0
  147.         c = d.cursor()
  148.         rec = c.first()
  149.         while rec is not None:
  150.             count = count + 1
  151.             if verbose:
  152.                 print rec
  153.             key, value = rec
  154.             self.checkrec(key, value)
  155.             rec = c.next()
  156.         del c
  157.  
  158.         assert count == len(d)
  159.  
  160.         count = 0
  161.         c = d.cursor()
  162.         rec = c.last()
  163.         while rec is not None:
  164.             count = count + 1
  165.             if verbose:
  166.                 print rec
  167.             key, value = rec
  168.             self.checkrec(key, value)
  169.             rec = c.prev()
  170.  
  171.         assert count == len(d)
  172.  
  173.         c.set(self.mk('SS'))
  174.         key, value = c.current()
  175.         self.checkrec(key, value)
  176.         del c
  177.  
  178.  
  179.     def test03_append(self):
  180.         # NOTE: this is overridden in RECNO subclass, don't change its name.
  181.         if verbose:
  182.             print '\n', '-=' * 30
  183.             print "Running %s.test03_append..." % self.__class__.__name__
  184.  
  185.         self.assertRaises(dbshelve.DBShelveError,
  186.                           self.d.append, 'unit test was here')
  187.  
  188.  
  189.     def checkrec(self, key, value):
  190.         # override this in a subclass if the key type is different
  191.         x = key[1]
  192.         if key[0] == 'S':
  193.             assert type(value) == StringType
  194.             assert value == 10 * x
  195.  
  196.         elif key[0] == 'I':
  197.             assert type(value) == IntType
  198.             assert value == ord(x)
  199.  
  200.         elif key[0] == 'L':
  201.             assert type(value) == ListType
  202.             assert value == [x] * 10
  203.  
  204.         elif key[0] == 'O':
  205.             assert type(value) == InstanceType
  206.             assert value.S == 10 * x
  207.             assert value.I == ord(x)
  208.             assert value.L == [x] * 10
  209.  
  210.         else:
  211.             raise AssertionError, 'Unknown key type, fix the test'
  212.  
  213. #----------------------------------------------------------------------
  214.  
  215. class BasicShelveTestCase(DBShelveTestCase):
  216.     def do_open(self):
  217.         self.d = dbshelve.DBShelf()
  218.         self.d.open(self.filename, self.dbtype, self.dbflags)
  219.  
  220.     def do_close(self):
  221.         self.d.close()
  222.  
  223.  
  224. class BTreeShelveTestCase(BasicShelveTestCase):
  225.     dbtype = db.DB_BTREE
  226.     dbflags = db.DB_CREATE
  227.  
  228.  
  229. class HashShelveTestCase(BasicShelveTestCase):
  230.     dbtype = db.DB_HASH
  231.     dbflags = db.DB_CREATE
  232.  
  233.  
  234. class ThreadBTreeShelveTestCase(BasicShelveTestCase):
  235.     dbtype = db.DB_BTREE
  236.     dbflags = db.DB_CREATE | db.DB_THREAD
  237.  
  238.  
  239. class ThreadHashShelveTestCase(BasicShelveTestCase):
  240.     dbtype = db.DB_HASH
  241.     dbflags = db.DB_CREATE | db.DB_THREAD
  242.  
  243.  
  244. #----------------------------------------------------------------------
  245.  
  246. class BasicEnvShelveTestCase(DBShelveTestCase):
  247.     def do_open(self):
  248.         self.homeDir = homeDir = os.path.join(
  249.             os.path.dirname(sys.argv[0]), 'db_home')
  250.         try: os.mkdir(homeDir)
  251.         except os.error: pass
  252.         self.env = db.DBEnv()
  253.         self.env.open(homeDir, self.envflags | db.DB_INIT_MPOOL | db.DB_CREATE)
  254.  
  255.         self.filename = os.path.split(self.filename)[1]
  256.         self.d = dbshelve.DBShelf(self.env)
  257.         self.d.open(self.filename, self.dbtype, self.dbflags)
  258.  
  259.  
  260.     def do_close(self):
  261.         self.d.close()
  262.         self.env.close()
  263.  
  264.  
  265.     def tearDown(self):
  266.         self.do_close()
  267.         import glob
  268.         files = glob.glob(os.path.join(self.homeDir, '*'))
  269.         for file in files:
  270.             os.remove(file)
  271.  
  272.  
  273.  
  274. class EnvBTreeShelveTestCase(BasicEnvShelveTestCase):
  275.     envflags = 0
  276.     dbtype = db.DB_BTREE
  277.     dbflags = db.DB_CREATE
  278.  
  279.  
  280. class EnvHashShelveTestCase(BasicEnvShelveTestCase):
  281.     envflags = 0
  282.     dbtype = db.DB_HASH
  283.     dbflags = db.DB_CREATE
  284.  
  285.  
  286. class EnvThreadBTreeShelveTestCase(BasicEnvShelveTestCase):
  287.     envflags = db.DB_THREAD
  288.     dbtype = db.DB_BTREE
  289.     dbflags = db.DB_CREATE | db.DB_THREAD
  290.  
  291.  
  292. class EnvThreadHashShelveTestCase(BasicEnvShelveTestCase):
  293.     envflags = db.DB_THREAD
  294.     dbtype = db.DB_HASH
  295.     dbflags = db.DB_CREATE | db.DB_THREAD
  296.  
  297.  
  298. #----------------------------------------------------------------------
  299. # test cases for a DBShelf in a RECNO DB.
  300.  
  301. class RecNoShelveTestCase(BasicShelveTestCase):
  302.     dbtype = db.DB_RECNO
  303.     dbflags = db.DB_CREATE
  304.  
  305.     def setUp(self):
  306.         BasicShelveTestCase.setUp(self)
  307.  
  308.         # pool to assign integer key values out of
  309.         self.key_pool = list(range(1, 5000))
  310.         self.key_map = {}     # map string keys to the number we gave them
  311.         self.intkey_map = {}  # reverse map of above
  312.  
  313.     def mk(self, key):
  314.         if key not in self.key_map:
  315.             self.key_map[key] = self.key_pool.pop(0)
  316.             self.intkey_map[self.key_map[key]] = key
  317.         return self.key_map[key]
  318.  
  319.     def checkrec(self, intkey, value):
  320.         key = self.intkey_map[intkey]
  321.         BasicShelveTestCase.checkrec(self, key, value)
  322.  
  323.     def test03_append(self):
  324.         if verbose:
  325.             print '\n', '-=' * 30
  326.             print "Running %s.test03_append..." % self.__class__.__name__
  327.  
  328.         self.d[1] = 'spam'
  329.         self.d[5] = 'eggs'
  330.         self.assertEqual(6, self.d.append('spam'))
  331.         self.assertEqual(7, self.d.append('baked beans'))
  332.         self.assertEqual('spam', self.d.get(6))
  333.         self.assertEqual('spam', self.d.get(1))
  334.         self.assertEqual('baked beans', self.d.get(7))
  335.         self.assertEqual('eggs', self.d.get(5))
  336.  
  337.  
  338. #----------------------------------------------------------------------
  339.  
  340. def test_suite():
  341.     suite = unittest.TestSuite()
  342.  
  343.     suite.addTest(unittest.makeSuite(DBShelveTestCase))
  344.     suite.addTest(unittest.makeSuite(BTreeShelveTestCase))
  345.     suite.addTest(unittest.makeSuite(HashShelveTestCase))
  346.     suite.addTest(unittest.makeSuite(ThreadBTreeShelveTestCase))
  347.     suite.addTest(unittest.makeSuite(ThreadHashShelveTestCase))
  348.     suite.addTest(unittest.makeSuite(EnvBTreeShelveTestCase))
  349.     suite.addTest(unittest.makeSuite(EnvHashShelveTestCase))
  350.     suite.addTest(unittest.makeSuite(EnvThreadBTreeShelveTestCase))
  351.     suite.addTest(unittest.makeSuite(EnvThreadHashShelveTestCase))
  352.     suite.addTest(unittest.makeSuite(RecNoShelveTestCase))
  353.  
  354.     return suite
  355.  
  356.  
  357. if __name__ == '__main__':
  358.     unittest.main(defaultTest='test_suite')
  359.